#include "config.h"

#include <sys/statvfs.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/statfs.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "cluster.h"
#include "squeue.h"
#include "cache.h"
#include "net_global.h"
#include "disk.h"
#include "../replica/replica.h"
#include "../controller/md_proto.h"
#include "chunk_bh.h"
#include "chunk_ops.h"
#include "chunk_proto.h"
#include "chunk_cleanup.h"
#include "lich_md.h"
#include "job_dock.h"
#include "dbg.h"

extern int node_get_deleting();

static int __replica_read(const nid_t *nid, const chkid_t *chkid, const vclock_t *vclock, buffer_t *buf, int flags)
{
        int ret;
        io_t io;

        io_init(&io, chkid, vclock, 0, LICH_CHUNK_SPLIT, flags);
        if (net_islocal(nid)) {
                ret = replica_srv_read(nid, &io, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = replica_rpc_read(nid, &io, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_getparent(const nid_t *nid, const chkid_t *chkid, fileid_t *parent)
{
        int ret;

        if (net_islocal(nid)) {
                ret = replica_srv_getparent(chkid, parent, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_getparent(nid, chkid, parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_push(const char *pool, const nid_t *nid, const chkid_t *chkid, const fileid_t *parent,
                        int tier, const vclock_t *vclock, uint64_t meta_version, const buffer_t *buf, int flags)
{
        int ret;

        if (net_islocal(nid)) {
                ret = replica_srv_push(pool, net_getnid(), chkid, parent, tier, vclock,
                                       meta_version, buf, flags);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_push(pool, nid, chkid, parent, tier, vclock, meta_version, buf, flags);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 * 以一副本from覆盖另一副本to
 *
 * @param pool
 * @param chkid
 * @param from
 * @param to
 * @param clock
 * @param meta_version
 * @param _parent
 * @param tier
 * @param flags
 * @return
 */
int chunk_push(const char *pool, const chkid_t *chkid, const nid_t *from, const nid_t *to,
               const vclock_t *vclock, uint64_t meta_version, const fileid_t *_parent,
               int tier, int flags, const char *context)
{
        int ret;
        buffer_t _buf;
        const fileid_t *parent;
        fileid_t tmp;

        ANALYSIS_BEGIN(0);
        
#if ENABLE_CHUNK_DEBUG
        DINFO("push pool %s "CHKID_FORMAT" from %s to %s, meta_version %ju flags:%x "
              VCLOCK_FORMAT" context %s\n", pool, CHKID_ARG(chkid),
              network_rname(from), network_rname(to), meta_version, flags, VCLOCK_ARG(vclock), context);
#else
        DBUG("push pool %s "CHKID_FORMAT" from %s to %s, meta_version %ju flags:%x "
             VCLOCK_FORMAT" context %s\n", pool, CHKID_ARG(chkid),
             network_rname(from), network_rname(to), meta_version, flags, VCLOCK_ARG(vclock), context);
#endif

        if (_parent == NULL) {
                ret = __chunk_getparent(from, chkid, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &tmp;
        } else
                parent = _parent;

        mbuffer_init(&_buf, 0);
        ret = __replica_read(from, chkid, vclock, &_buf, flags);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_push_read");

        ANALYSIS_BEGIN(1);

        ret = __chunk_push(pool, to, chkid, parent, tier, vclock, meta_version, &_buf, flags);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ANALYSIS_QUEUE(1, IO_WARN, "chunk_push_write");
        
        mbuffer_free(&_buf);

        return 0;
err_free:
        mbuffer_free(&_buf);
err_ret:
        return ret;
}

int chunk_push_with(const char *pool, const chkid_t *chkid, const buffer_t *buf, const nid_t *to,
               const vclock_t *vclock, uint64_t meta_version, const fileid_t *parent,
               int tier, int flags)
{
        int ret;

        YASSERT(parent);
        DBUG("push "CHKID_FORMAT" chunk to %s, meta_version %llu\n", CHKID_ARG(chkid),
              network_rname(to), (LLU)meta_version);

        ret = __chunk_push(pool, to, chkid, parent, tier, vclock, meta_version, buf, flags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __chunk_sync_newinfo(const char *pool, chkinfo_t *chkinfo, int bad)
{
        int ret, idx, i;
        diskid_t skip[LICH_REPLICA_MAX + 1], newnid;

        idx = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (i == bad) {
                        continue;
                } else {
                        skip[idx] = chkinfo->diskid[i].id;
                        idx++;
                }
        }

        ret = dispatch_newdisk2(pool, &newnid, skip, idx);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(newnid.id);
        for (i = 0; i < idx; i++) {
                YASSERT(nid_cmp(&newnid, &skip[i]));
        }

        chkinfo_replace(chkinfo, bad, &newnid);

#if ENABLE_CHUNK_DEBUG
        chkinfo_rack_check(chkinfo);
#else
#endif
        
        return 0;
err_ret:
        return ret;
}

static int __chunk_sync_getparent(const nid_t *nid, const chkid_t *chkid, fileid_t *parent)
{
        int ret;

        if (chkid->type == __RAW_CHUNK__) {
                cid2fid(parent, chkid);
                return 0;
        }

        if (net_islocal(nid)) {
                ret = replica_srv_getparent(chkid, parent, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_getparent(nid, chkid, parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int chunk_push_newdisk(const char *pool, chkinfo_t *chkinfo, int _from, int _to, const vclock_t *vclock,
                       const fileid_t *_parent, int tier, int flags, int *oflags)
{
        int ret;
        time_t ltime;
        const nid_t *from, *to;
        const fileid_t *parent;
        fileid_t fileid;
        chkinfo_t *newinfo;
        char _chkinfo[CHKINFO_MAX];

        newinfo = (void *)_chkinfo;
        CHKINFO_CP(newinfo, chkinfo);
        from = &newinfo->diskid[_from].id;
        to = &newinfo->diskid[_to].id;

        if (_parent == NULL) {
                ret = __chunk_sync_getparent(from, &newinfo->id, &fileid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &fileid;
        } else {
                parent = _parent;
        }

        ret = __chunk_sync_newinfo(pool, newinfo, _to);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG(CHKID_FORMAT" sync %s -> %s\n", CHKID_ARG(&chkinfo->id),
             network_rname(from), network_rname(to));
        
        YASSERT(nid_cmp(from, to));
        ret = network_connect(to, &ltime, 2, 1);
        if (unlikely(ret)) {
                DWARN("connect %s fail\n", network_rname(to));
                GOTO(err_ret, ret);
        }

        *oflags = 1;

        ret = chunk_push(pool, &newinfo->id, from, to, vclock, newinfo->info_version,
                         parent, tier, flags, "new");
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_BALANCE
        // volume controller move, sync to master
        if (chkinfo->id.type == __VOLUME_CHUNK__ && _from == 0) {
                stor_mkvol_sync2master(network_rname(&newinfo->diskid[0].id), &newinfo->id);
        }
#endif

        CHKINFO_CP(chkinfo, newinfo);

        return 0;
err_ret:
        return ret;
}

int chunk_push_newdisk_with(const char *pool, chkinfo_t *chkinfo, const buffer_t *buf, int _to, const vclock_t *vclock,
                       const fileid_t *parent, int tier, int flags)
{
        int ret;
        time_t ltime;
        const nid_t *to;
        chkinfo_t *newinfo;
        char _chkinfo[CHKINFO_MAX];

        newinfo = (void *)_chkinfo;
        CHKINFO_CP(newinfo, chkinfo);
        to = &newinfo->diskid[_to].id;

        ret = __chunk_sync_newinfo(pool, newinfo, _to);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = network_connect(to, &ltime, 2, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_push_with(pool, &newinfo->id, buf, to, vclock, newinfo->info_version,
                         parent, tier, flags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_CP(chkinfo, newinfo);

        return 0;
err_ret:
        return ret;
}
